使用协程和 Flow 简化 API 设计
https://youtu.be/OmHePYcHbyQ
检查现有协程适配器
Future 类型
对于 future 类型,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture。这里提到的并不是全部,您可以在线搜索以确定是否存在适用于您的 future 类型的适配器。
CompletableFuture
https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
ListenableFuture
https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
// 等待 CompletionStage 的执行完成而不阻塞线程
suspend fun <T> CompletionStage<T>.await(): T
// 等待 ListenableFuture 的执行完成而不阻塞线程
suspend fun <T> ListenableFuture<T>.await(): T
使用这些函数,您可以摆脱回调并挂起协程直到 future 的结果被返回。
Reactive Stream
RxJava
https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-rx3
Java 9 API
https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-jdk9
响应式流库
https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-reactive
// 将给定的响应式 Publisher 转换为 Flow
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
这些函数将响应式流转换为了 Flow。
Android 专用 API
对于 Jetpack 库或 Android 平台 API,您可以参阅 Jetpack KTX 库列表。现有超过 20 个库拥有 KTX 版本,构成了您所熟悉的 Java API。其中包括 SharedPreferences、ViewModels、SQLite 以及 Play Core。
Jetpack KTX 库
https://developer.android.google.cn/kotlin/ktx/extensions-list
回调
后台线程任务运行指南 https://developer.android.google.cn/guide/background/threading
创建您自己的适配器
如果没有找到适合您用例的适配器,更直接的做法是自己编写适配器。对于一次性异步调用,可以使用 suspendCancellableCoroutine API;而对于流数据,可以使用 callbackFlow API。
suspendCancellableCoroutine https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/suspend-cancellable-coroutine.html callbackFlow https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/callback-flow.html
Fused Location Provider https://developers.google.cn/android/reference/com/google/android/gms/location/FusedLocationProviderClient.html
一次性异步调用
注意: 这一 API 返回值为 Task,并且已经有了对应的适配器。出于学习的目的,我们用它作为范例。
Fused Location Provider
https://developers.google.cn/android/reference/com/google/android/gms/location/FusedLocationProviderClient.html
getLastLocation
https://developers.google.cn/android/reference/com/google/android/gms/location/FusedLocationProviderClient#getLastLocation()
最后已知位置
https://developer.android.google.cn/training/location/retrieve-current
Task
https://developers.google.cn/android/reference/com/google/android/gms/tasks/Task
适配器
https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-play-services/src/Tasks.kt
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
由于这是一个一次性异步操作,我们使用 suspendCancellableCoroutine 函数: 一个用于从协程库创建挂起函数的底层构建块。
Continuation https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/-continuation/
我们使用可以添加到 getLastLocation 方法中的回调来在合适的时机恢复协程。参见下面的实现:
// FusedLocationProviderClient 的扩展函数,返回最后已知位置
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location =
// 创建新的可取消协程
suspendCancellableCoroutine<Location> { continuation ->
// 添加恢复协程执行的监听器
lastLocation.addOnSuccessListener { location ->
// 恢复协程并返回位置
continuation.resume(location)
}.addOnFailureListener { e ->
// 通过抛出异常来恢复协程
continuation.resumeWithException(e)
}
// suspendCancellableCoroutine 块的结尾。这里会挂起协程
//直到某个回调调用了 continuation 参数
}
注意: 尽管协程库中同样包含了不可取消版本的协程构建器 (即 suspendCoroutine),但最好始终选择使用 suspendCancellableCoroutine 处理协程作用域的取消及从底层 API 传播取消事件。
suspendCoroutine https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/suspend-coroutine.html suspendCancellableCoroutine https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/suspend-cancellable-coroutine.html
suspendCancellableCoroutine 原理
suspendCancellableCoroutine https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/CancellableContinuation.kt#L305 suspendCoroutineUninterceptedOrReturn https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/src/kotlin/coroutines/intrinsics/Intrinsics.kt#L41 CancellableContinuation https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/CancellableContinuation.kt 实现 https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
您可以通过我在下面代码片段 (原版实现) 中的注释来了解发生了什么:
原版实现 https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/CancellableContinuation.kt#L305
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
// 获取运行此挂起函数的协程的 Continuation 对象
suspendCoroutineUninterceptedOrReturn { uCont ->
// 接管协程。Continuation 已经被拦截,
// 接下来将会遵循 CancellableContinuationImpl 的生命周期
val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
/* ... */
// 使用可取消 Continuation 调用代码块
block(cancellable)
// 挂起协程并且等待 Continuation 在 “block” 中被恢复,或者在 “block” 结束执行时返回结果
cancellable.getResult()
}
流数据
如果我们转而希望用户的设备在真实的环境中移动时,周期性地接收位置更新 (使用 requestLocationUpdates 函数),我们就需要使用 Flow 来创建数据流。理想的 API 看起来应该像下面这样:
requestLocationUpdates
https://developers.google.cn/android/reference/com/google/android/gms/location/FusedLocationProviderClient#requestLocationUpdates(com.google.android.gms.location.LocationRequest,%20com.google.android.gms.location.LocationCallback,%20android.os.Looper)
Flow
https://kotlinlang.org/docs/reference/coroutines/flow.html
fun FusedLocationProviderClient.locationFlow(): Flow<Location>
为了将基于回调的 API 转换为 Flow,可以使用 callbackFlow 流构建器来创建新的 flow。callbackFlow 的 lambda 表达式的内部处于一个协程的上下文中,这意味着它可以调用挂起函数。不同于 flow 流构建器,channelFlow 可以在不同的 CoroutineContext 或协程之外使用 offer 方法发送数据。
callbackFlow https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/callback-flow.html flow https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html offer https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
通常情况下,使用 callbackFlow 构建流适配器遵循以下三个步骤:
创建使用 offer 向 flow 添加元素的回调;
注册回调;
等待消费者取消协程,并注销回调。
将上述步骤应用于当前用例,我们得到以下实现:
// 发送位置更新给消费者
fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
// 创建了新的 Flow。这段代码会在协程中执行。
// 1. 创建回调并向 flow 中添加元素
val callback = object : LocationCallback() {
override fun onLocationResult(result: LocationResult?) {
result ?: return // 忽略为空的结果
for (location in result.locations) {
try {
offer(location) // 将位置发送到 flow
} catch (t: Throwable) {
// 位置无法发送到 flow
}
}
}
}
// 2. 注册回调并通过调用 requestLocationUpdates 获取位置更新。
requestLocationUpdates(
createLocationRequest(),
callback,
Looper.getMainLooper()
).addOnFailureListener { e ->
close(e) // 在出错时关闭 flow
}
// 3. 等待消费者取消协程并注销回调。这一过程会挂起协程,直到 Flow 被关闭。
awaitClose {
// 在这里清理代码
removeLocationUpdates(callback)
}
}
callbackFlow 内部原理
channel https://kotlinlang.org/docs/reference/coroutines/channels.html 队列 https://en.wikipedia.org/wiki/Queue_(abstract_data_type)
awaitClose 内部原理
有趣的是,awaitClose 内部使用的是 suspendCancellableCoroutine。您可以通过我在以下代码片段中的注释 (查看原始实现) 一窥究竟:
原始实现
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/Produce.kt#L49
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
...
try {
// 使用可取消 continuation 挂起协程
suspendCancellableCoroutine<Unit> { cont ->
// 仅在 Flow 或 Channel 关闭时成功恢复协程,否则保持挂起
invokeOnClose { cont.resume(Unit) }
}
} finally {
// 总是会执行调用者的清理代码
block()
}
}
复用 Flow
conflate https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
shareIn https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/share-in.html
val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
...
}.shareIn(
// 让 flow 跟随 applicationScope
applicationScope,
// 向新的收集器发送上一次发送的元素
replay = 1,
// 在有活跃的订阅者时,保持生产者的活跃状态
started = SharingStarted.WhileSubscribed()
)
codelab: 创建 Kotlin 扩展库
https://developer.android.google.cn/codelabs/building-kotlin-extensions-library
推荐阅读